Skip to content

Handle Absinthe unsubscriptions #228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

pzingg
Copy link
Contributor

@pzingg pzingg commented Aug 13, 2021

Adds support for Absinthe GraphQL subscriptions, handling unsubscribe requests and replies.

  1. Fixes Phoenix transport: breaking early from async generator loop requires canceling receive_data_task #225
    a. Handles TransportClosed exception when the transport is closed before a subscription is unsubscribed.
    b. Removes the listener when the unsubscribe reply is received from server.

  2. Implements the Absinthe unsubscribe protocol in the mock servers in tests_phoenix_channel_subscription.py.

@pzingg pzingg changed the title Fix phoenix subscriptions close Handle Absinthe unsubscriptions Aug 13, 2021
@leszekhanusz
Copy link
Collaborator

You can run make check to fix linting issues

@pzingg
Copy link
Contributor Author

pzingg commented Aug 13, 2021

Thanks. Probably need to add a an aclose() call for Python 3.6 tests as well.

@pzingg
Copy link
Contributor Author

pzingg commented Aug 13, 2021

I installed all the dev requirements and everything checks and passes now. However my version of the Python formatter black (21.7b0) reformatted 50 files! Should I commit them all, or install black version 19.10b0 and try again, or is there a way to skip the reformatting of files that I have not edited?

@leszekhanusz
Copy link
Collaborator

It is explained in the CONTRIBUTING.md file.
You can install the correct version of black by running pip install -e.[dev]

Once it is done, running make check again will fix everything.

@pzingg
Copy link
Contributor Author

pzingg commented Aug 14, 2021

Tests and checks now passed locally with e.[dev] tool versions.

@codecov-commenter
Copy link

codecov-commenter commented Aug 14, 2021

Codecov Report

Merging #228 (b740d58) into master (66174a6) will not change coverage.
The diff coverage is 100.00%.

Impacted file tree graph

@@            Coverage Diff            @@
##            master      #228   +/-   ##
=========================================
  Coverage   100.00%   100.00%           
=========================================
  Files           16        16           
  Lines         1067      1149   +82     
=========================================
+ Hits          1067      1149   +82     
Impacted Files Coverage Δ
gql/transport/phoenix_channel_websockets.py 100.00% <100.00%> (ø)
gql/transport/websockets.py 100.00% <100.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 66174a6...b740d58. Read the comment docs.

@pzingg
Copy link
Contributor Author

pzingg commented Aug 14, 2021

I'm thinking perhaps I should create another exception class, TransportSubscriptionError, to handle the potential error that the CodeCov caught. It could happen if somehow the Phoenix Channel transport had removed a subscription listener, but then received an unsubscribe message still referring to it. That would be something wrong on the transport's part, but it's not one of the existing TransportErrors. I can write another test to cover it. Any comments?

@pzingg
Copy link
Contributor Author

pzingg commented Aug 16, 2021

My last commit is an attempt to handle more edge cases around the GraphQL and Phoenix Channel specs. I'm going hands off for a while. All tests are passing, and I'm testing against my Absinthe server as well.

@leszekhanusz
Copy link
Collaborator

Thanks for your help!

As you have seen I modified the code so that the _remove_listener can only happen in the subscribe method of the transport (same as the websockets transport).
This is because the reception of the phoenix messages and the consumption of the message are done in separate tasks. In some cases, if the consumer is slow you might need to first consume the previous messages before stopping the async generator cleanly.

I actually found the real bug you had at first which cause break to not work properly (even for the websockets transport).
Because of the python 3.6 bug, I had to keep a _generator reference in the session. If you do a break in a session.subscribe, and you don't keep a reference to this generator, then a GeneratorExit will be generated only for the outer generator.
Now with PR #230, it should work correctly and we will catch this and run aclose in the gql library when we detect that the outer async generator is ending.

@leszekhanusz
Copy link
Collaborator

Also about your proposition of a TransportSubscriptionError exception, I suggest that instead we ignore this message and just log it as an error.

@pzingg
Copy link
Contributor Author

pzingg commented Aug 17, 2021 via email

pzingg and others added 3 commits August 16, 2021 19:07
The async generator will end properly and
we will remove the listener inside the subscribe method
@leszekhanusz
Copy link
Collaborator

Note: you can see the full picture using -s with pytest

pytest tests/test_phoenix_channel_subscription.py::test_phoenix_channel_subscription_no_break -s

@leszekhanusz
Copy link
Collaborator

Coverage is now fixed but I really don't like the fact that now you need to use get_operation_ast in subscribe to save the operation in the Listener.
This is added work for the existing websockets transport without any benefits.
I think we should remove this. Thoughts ?

@pzingg
Copy link
Contributor Author

pzingg commented Aug 17, 2021

I don't know anything about the plain websocket protocol, but in Phoenix Channels, the replies from subscription docs are completely distinct from queries or mutations. We could let go of using get_operation_ast, and let the reply determine what we know about the request, but I think it's a stronger protocol to match the expected reply response content ("subscriptionId" for subscriptions, "data" and/or "errors" for queries and mutations) against the operation type.

Since the "subscribe" code has already called "gql.gql" the hard work of building the AST is already paid for, "get_operation_ast" is just a quick traversal of the parsed nodes: https://github.com/graphql-python/graphql-core/blob/main/src/graphql/utilities/get_operation_ast.py

You may have noticed that I split the tests into those that send queries and those that send subscriptions. One test I would like to add (but don't have time this week) is to create a session with a PhoenixChannelWebsocketsTransport, and run an async that has two separate queries in it (ideally one a mutation and the other a subscription to the mutation results). I have a working example of this in a separate client project. It seems to work now, removing the listeners appropriately as they are unsubscribed, etc. But it would be nice to have a test server that can handle both types of replies concurrently. In the vanilla websockets tests, there is "test_websocket_multiple_connections_in_parallel", but I don't see a test that does a single session connection with multiple queries in parallel.

Is there an issue to create these types of tests already, or should I open one?

Thanks for fixing the "unsubscribe" vs "complete" answer type.

I'm just going to do some live testing on my Absinthe/Phoenix Channel server to send back some error responses to make sure they are parsed correctly. I will also spend some time looking at this: https://github.com/easco/absinthe_apollo_sockets, so I get a better understanding of how the vanilla websockets differ for my own edification. But I'll stay out of the code!

@leszekhanusz
Copy link
Collaborator

Another way to distinguish between query/mutation and subscription queries is that with queries and mutation, the user is supposed to use the execute method which will set send_stop to False in the listener. With subscription, the user should use the subscribe method which will set send_stop to True. But I really don't see the point to check this.

As for the tests with concurrent queries on the same connection, some of them already exists with other transports:

@pzingg
Copy link
Contributor Author

pzingg commented Aug 17, 2021

Yes. When I split the tests in test_phoenix_channel_query.py and test_phoenix_channel_exception.py, I used "execute" for queries and "subscribe" for subscriptions.

As far as servers go, I took the absinthe_apollo_socket code, and built a Phoenix Channel server transport that has the same GraphQL schema and backend (simple counter with query, reset and increment mutations, and a subscription) as the Apollo websocket transport. Connecting to both backends with the code in the PR work as expected, when async running both a series of mutation executions and a long-running subscription, for both the PhoenixChannel and the plain Websocket transports. And I can test some error responses also. So I'm happy with things.

@leszekhanusz leszekhanusz merged commit 20ae2e2 into graphql-python:master Aug 22, 2021
@leszekhanusz
Copy link
Collaborator

Thanks for PR!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Phoenix transport: breaking early from async generator loop requires canceling receive_data_task
3 participants